package com.webflux.service;

import com.alibaba.fastjson.JSONObject;
import com.webflux.entity.ElasticSearchEntity;
import com.webflux.entity.UserEntity;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;


@Service
public class ElasticSearchService {

    @Autowired
    private RestHighLevelClient client;

    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;
    @Autowired
    private ElasticsearchRepository elasticsearchRepository;


    /**
     * @param
     * @return
     * @description es创建索引
     * @author suewong
     * @date 2020/11/19 10:33
     */
    public void createIndex() throws IOException {

        UserEntity userEntity = new UserEntity();


        userEntity.setId(1);
        userEntity.setPhone("你好");
        userEntity.setPassword("灰原哀");

        //创建索引请求
        CreateIndexRequest request = new CreateIndexRequest("t_user");
        System.out.println(client);
        //这里是通过客户端，然后传递请求， RequestOptions.DEFAULT（这个是默认的，是es里面的，可以查看源码）
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(response);

        //不管用什么方法,都不要忘记client.close();
        client.close();
    }


    /**
     * @param
     * @return
     * @description 创建文档
     * @author suewong
     * @date 2020/11/17 11:11
     */
    public void createDocument(String indexName, ElasticSearchEntity object) throws IOException {
        IndexRequest request = new IndexRequest(indexName);
        //创建要添加的信息
//        UserEntity userEntity = new UserEntity();
//        userEntity.setId(2);
//        userEntity.setPhone("琴酒");
//        userEntity.setPassword("gin");

        //通过rest高级客户端添加信息

//        request.timeout("10s");	//请求超时不执行
//把要添加的信息放入请求中，XContentType.JSON（指定信息的类型）
        request.source(JSONObject.toJSONString(object), XContentType.JSON);
        //通过客户端传递求情
        IndexResponse responseResult = client.index(request, RequestOptions.DEFAULT);
        System.out.println(responseResult.toString());
        System.out.println(responseResult.status());
    }


    public void find() throws IOException {
        //获取文档

        GetRequest getRequest = new GetRequest("t_user", "Ny11tnUB9yeylRfcFeeg");
        //不获取 _souce上下文
//		getRequest.fetchSourceContext(new FetchSourceContext(false));
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        if (exists) {
            //获取文档信息
            GetResponse documentFields = client.get(getRequest, RequestOptions.DEFAULT);
            System.out.println(documentFields.getSourceAsString());
            System.out.println(documentFields);
        }

    }

    /**
     * @param
     * @return
     * @description es创建文档
     * @author suewong
     * @date 2020/11/19 10:33
     */
    public void searchDocument() throws IOException {

        SearchRequest request = new SearchRequest("t_user");
        TermQueryBuilder termQuery = QueryBuilders.termQuery("phone", "11012345");
        SearchSourceBuilder builder = new SearchSourceBuilder();

        builder.query(termQuery);

        //把查询添加放入请求中
        request.source(builder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //打印查询的信息
        for (SearchHit hit : response.getHits().getHits()) {
            System.out.println("查询结果：" + hit.getSourceAsMap() + "====================================");
        }
    }


    /**
     * @param
     * @return
     * @description es模糊查询
     * @author suewong
     * @date 2020/11/19 10:33
     */
    public void queryMatch() throws IOException {
        SearchRequest request = new SearchRequest("t_user");
        WildcardQueryBuilder termQuery = QueryBuilders.wildcardQuery("phone", "*哀*");
        SearchSourceBuilder builder = new SearchSourceBuilder();

        builder.query(termQuery);

        //把查询添加放入请求中
        request.source(builder);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //打印查询的信息
        for (SearchHit hit : response.getHits().getHits()) {
            System.out.println("查询结果：" + hit.getSourceAsMap() + "====================================");
        }
    }


//    public void search2() {
//        SearchRequestBuilder srb = client.search("book").setTypes("kehuan");
//        SearchResponse sr = srb.setQuery(QueryBuilders.multiMatchQuery("美国宇宙红岸", "title", "content")
//                .analyzer(ANALYZER))
//                .setFetchSource(new String[]{"title", "price"}, null)
//                .execute()
//                .actionGet();
//        SearchHits hits = sr.getHits();
//        for (SearchHit hit : hits) {
//            System.out.println(hit.getSourceAsString());
//        }
//    }

    public Flux<UserEntity> queryMatchDivided(String keyWord, Integer pageNum, Integer pageSize) {
        // 1.创建查询对象
//        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//        MatchQueryBuilder matchQuery = QueryBuilders.matchQuery("username", keyWord);
//        boolQuery.must(matchQuery);
//        // 2.调用查询接口
//        Iterable<UserEntity> search = elasticsearchRepository.searchSimilar(boolQuery, );
//        // 3.将迭代器转换为集合
//        return Flux.fromIterable(search);
        return null;

    }


    /**
     * @param
     * @return
     * @description 创建索引库
     * @author suewong
     * @date 2020/11/16 17:38
     */
//        PutMappingRequest mapping = null;
//        try {
//            XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
//                    // 索引库名（类似数据库中的表）
//                    .startObject(indexType).startObject("properties")
//                    .startObject("username").field("type", "string")
//                    .field("analyzer", "ik").field("search_analyzer", "ik_smart").endObject()
//                    .startObject("phone").field("type", "string")
//                    .field("analyzer", "ik").field("search_analyzer", "ik_smart").endObject()
//                    //.field("boost",100).endObject()
//                    // 姓名
//                    //.startObject("name").field("type", "string").endObject()
//                    // 位置
//                    //.startObject("location").field("type", "geo_point").endObject()
//                    //.endObject().startObject("_all").field("analyzer","ik").field("search_analyzer","ik").endObject().endObject().endObject();
//                    .endObject().endObject().endObject();
//           mapping = Requests.putMappingRequest(indexType).type(mappingType).source(builder);
//
//           client.indices().
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
    public void test() {

    }


    public void createMapping(String indices, String mappingType) throws Exception {
        final XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
                // 索引库名（类似数据库中的表）
                .startObject(indices).startObject("properties")
                .startObject("username").field("type", "string")
                .field("analyzer", "ik").field("search_analyzer", "ik_smart").endObject()
                // .startObject("phone").field("type", "string")
                //.field("analyzer", "ik").field("search_analyzer", "ik_smart").endObject()
                //.field("boost",100).endObject()
                // 姓名
                //.startObject("name").field("type", "string").endObject()
                // 位置
                //.startObject("location").field("type", "geo_point").endObject()
                //.endObject().startObject("_all").field("analyzer","ik").field("search_analyzer","ik").endObject().endObject().endObject();
                .endObject().endObject().endObject();

        final PutMappingRequest request = Requests.putMappingRequest(indices)
                .type(mappingType)
                .source(builder);

        client.indices().putMapping(request, RequestOptions.DEFAULT);
        client.close();

    }


    /**
     * @param
     * @return
     * @description es批量添加
     * @author suewong
     * @date 2020/11/19 10:32
     */
    public void batchCreate(List<UserEntity> objects) throws IOException {
        BulkRequest request = new BulkRequest("t_user");


        for (UserEntity item : objects) {
            request.add(new IndexRequest().source(JSONObject.toJSONString(item), XContentType.JSON));

        }
        //通过客户端传递求情
        BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
        System.out.println(response.status());

    }


    /**
     * @param
     * @return
     * @description es模糊查询
     * @author suewong
     * @date 2020/11/19 10:31
     */
    public Flux<UserEntity> findAll(String keyword) throws IOException {

        MatchQueryBuilder username = QueryBuilders.matchQuery("username", keyword).analyzer("ik_max_word");

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(username);

        SearchRequest searchRequest = new SearchRequest();
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        //打印查询的信息
        //打印查询的信息
        for (SearchHit hit : searchResponse.getHits().getHits()) {
            System.out.println("查询结果：" + hit.getSourceAsMap() + "====================================");
        }


        //es分页转换发射器
        Mono<List<UserEntity>> listMono = Mono.just(searchResponse.getHits()).map(e -> {
            return Arrays.stream(e.getHits()).map(a -> {
                UserEntity userEntity = JSONObject.parseObject(a.getSourceAsString(), UserEntity.class);
                return userEntity;
            }).collect(Collectors.toList());
        });
        Flux<UserEntity> industryResearchFlux = listMono.flatMapMany(Flux::fromIterable);
        return industryResearchFlux;

    }


    /**
     * @param
     * @return
     * @description 多字段分词模糊查询
     * @author suewong
     * @date 2020/11/19 13:34
     */
    public Flux<UserEntity> findMatch(String keyword) {
        DisMaxQueryBuilder builder = ceateQueryBuilder(keyword);
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(builder)
                .build();
        SearchHits<UserEntity> searchHits = elasticsearchRestTemplate.search(searchQuery, UserEntity.class, IndexCoordinates.of("t_user"));

        //打印查询的信息
        for (org.springframework.data.elasticsearch.core.SearchHit<UserEntity> hit : searchHits) {
            System.out.println("查询结果：" + hit.getContent() + "====================================");
        }

        //es分页转换发射器
        Mono<List<UserEntity>> listMono = Mono.just(searchHits).map(e -> {
            return e.getSearchHits().stream().map(a -> {
                return a.getContent();
            }).collect(Collectors.toList());
        });
        Flux<UserEntity> industryResearchFlux = listMono.flatMapMany(Flux::fromIterable);
        return industryResearchFlux;

    }

    public DisMaxQueryBuilder ceateQueryBuilder(String keyword) {

        //使用dis_max直接取多个query中，分数最高的那一个query的分数即可
        DisMaxQueryBuilder disMaxQueryBuilder = QueryBuilders.disMaxQuery();
        //boost 设置权重,只搜索匹配name和disrector字段
        QueryBuilder ikNameQuery = QueryBuilders.matchQuery("username", keyword);
        QueryBuilder pinyinNameQuery = QueryBuilders.matchQuery("alias", keyword);
//        QueryBuilder ikDirectorQuery = QueryBuilders.matchQuery("phone", keyword).boost(2f);
        disMaxQueryBuilder.add(ikNameQuery);
        disMaxQueryBuilder.add(pinyinNameQuery);
//        disMaxQueryBuilder.add(ikDirectorQuery);
        return disMaxQueryBuilder;
    }


    public Flux<UserEntity> queryPage(String keyword, Integer page, Integer size) {

        // 第一个参数是页数page，第二个参数是每页数据数量pageSize
        Pageable pageable = PageRequest.of(page, size);

        DisMaxQueryBuilder builder = ceateQueryBuilder(keyword);
        NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
                .withQuery(builder)
                .withPageable(pageable)
                .build();
        SearchHits<UserEntity> searchHits = elasticsearchRestTemplate.search(searchQuery, UserEntity.class, IndexCoordinates.of("t_user"));

        //打印查询的信息


        for (org.springframework.data.elasticsearch.core.SearchHit<UserEntity> hit : searchHits) {
            System.out.println("查询结果：" + hit.getContent() + "===================================="+searchHits.getTotalHits());
        }
        //es分页转换发射器
        Mono<List<UserEntity>> listMono = Mono.just(searchHits).map(e -> {
            return e.getSearchHits().stream().map(a -> {
                return a.getContent();
            }).collect(Collectors.toList());
        });
        Flux<UserEntity> industryResearchFlux = listMono.flatMapMany(Flux::fromIterable);
        return industryResearchFlux;

    }
}
